Skip to content

feat: RPC fallback for missing DAG definitions on workers#2271

Merged
yohamta0 merged 4 commits into
dagucloud:mainfrom
four-flames:feat/2259-rpc-fallback-dag-definitions
Jun 11, 2026
Merged

feat: RPC fallback for missing DAG definitions on workers#2271
yohamta0 merged 4 commits into
dagucloud:mainfrom
four-flames:feat/2259-rpc-fallback-dag-definitions

Conversation

@four-bytes-robby

@four-bytes-robby four-bytes-robby commented Jun 7, 2026

Copy link
Copy Markdown
Contributor

Closes #2259

Problem

When a worker executes a sub-DAG via dag.run and the DAG YAML is not yet present in its local store, the step fails. This creates a race condition in git-synced setups:

  1. New DAG YAML is pushed to the coordinator
  2. Sub-DAG is triggered before the worker's local git-sync catches up
  3. The step fails unnecessarily

Solution

Workers fetch missing DAG definitions from the coordinator on demand via a new GetDAG RPC as a fallback when the local store misses.

  • Primary source: local DAG store (unchanged)
  • Fallback: RPC call to coordinator for DAG YAML retrieval
  • Error handling: if coordinator also does not have it → fail gracefully

Changes

  • New gRPC GetDAG RPC in coordinator proto with GetDAGRequest/GetDAGResponse
  • Coordinator handler: dagStore field + GetDAG implementation (returns YAML spec by name, returns Unimplemented when dagStore is nil)
  • Coordinator client: GetDAG(ctx, name) method added to Client interface + implementation
  • Agent/dbClient: RemoteDAGLoader function type wired through agent options → dbClient.GetDAG() tries local-first, then RPC fallback on miss
  • Worker remote_handler: closure using coordinator client GetDAG + spec.LoadYAML to parse remote YAML
  • cmd/coord.go + context.go: DAGStore wired into coordinator startup

Data Flow

Worker sub-DAG → dbClient.GetDAG(name)
  ├─ [Primary] Local DAGStore.GetDetails(name) → ok? return
  └─ [Fallback] RemoteDAGLoader(ctx, name)
       └─ coordinatorClient.GetDAG [gRPC] → coordinator: dagStore.GetSpec(name)
       └─ spec.LoadYAML(yaml) → *core.DAG

Testing

  • go build ./... — clean
  • go vet ./... — clean (no new issues)
  • ✅ Unit tests pass: agent, coordinator, worker, cmd, frontend/api/v1
  • 🔜 Pending: integration test on real distributed coordinator+worker setup

Notes

  • Feature is purely additive — dagStore on coordinator is optional (returns Unimplemented if nil)
  • RemoteDAGLoader defaults to nil → no fallback attempted on workers without a coordinator client
  • All existing code paths are untouched; fallback only triggers after local lookup fails

Summary by cubic

Adds an RPC fallback so workers can fetch missing DAG YAML from the coordinator when local DAGs aren’t synced yet, preventing sub-DAG failures. Implements a local-first lookup that only falls back on not-found and works even when a worker has no local DAGStore (Linear #2259).

  • New Features

    • Added gRPC GetDAG to the coordinator (GetDAGRequest/GetDAGResponse) to serve DAG YAML (Unimplemented if no store).
    • Extended client with GetDAG; workers try local-first and fall back via agent RemoteDAGLoader in remote_handler using spec.LoadYAML.
    • Wired DAGStore into command context and coordinator startup (startall) so RPCs can read DAG definitions.
  • Bug Fixes

    • Guarded nil worker DAGStore and restricted fallback to exec.ErrDAGNotFound; on remote failure or empty, return the original local error; tests added.
    • Eliminated flakiness in chatbridge batcher tests by adding a synchronous bucket flush and using it in tests.

Written for commit 033ebb9. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

  • New Features
    • Workers can now retrieve DAG definitions from the coordinator when local definitions are unavailable, improving sub-workflow execution reliability.

@coderabbitai

coderabbitai Bot commented Jun 7, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

This PR implements remote DAG definition fetching as a fallback for workers executing sub-DAGs. Workers now query the coordinator for missing DAG YAML via RPC when local store lookups fail, enabling graceful sub-DAG execution in git-synced environments where DAG definitions may not yet be present locally.

Changes

Remote DAG Fallback Flow

Layer / File(s) Summary
RPC protocol contract
proto/coordinator/v1/coordinator.proto
CoordinatorService gains GetDAG RPC accepting GetDAGRequest (DAG name) and returning GetDAGResponse (raw YAML spec and optional error string).
Coordinator handler implementation
internal/service/coordinator/handler.go
Handler and HandlerConfig add dagStore field. NewHandler wires the store. New GetDAG RPC returns Unimplemented when store is nil; otherwise fetches spec via dagStore.GetSpec and returns it in the response or error string on failure.
Coordinator context and DAG store initialization
internal/cmd/context.go, internal/cmd/coord.go, internal/cmd/startall.go
Context adds DAGStore field propagated by WithContext. NewContext initializes store via cmdprocess.NewDAGStore and assigns to context. runCoordinator and startall pass ctx.DAGStore to newCoordinator, which extends signature and wires store into handler config.
Coordinator client GetDAG method
internal/service/coordinator/client.go
Client interface gains GetDAG(ctx, name) (string, error). Implementation discovers coordinators, issues gRPC GetDAG RPC with retry logic, and returns spec on success; rejects nil/empty responses and propagates coordinator errors.
Agent remote loader interface and setup
internal/runtime/agent/agent.go
Defines exported RemoteDAGLoader function type. Agent.Options gains RemoteDAGLoader field; Agent struct stores it. New() assigns the loader. Both Run() and dryRun() pass loader to database client initialization.
Agent dbclient remote DAG fallback implementation
internal/runtime/agent/dbclient.go
dbClient and newDBClient carry optional remoteDAGLoader dependency. GetDAG implements fallback: logs local miss, attempts remote load if configured, returns remote DAG if available; otherwise returns original local error. Adds logging imports.
Worker remote loader integration
internal/service/worker/remote_handler.go
executeDAGRun constructs remoteDAGLoader closure fetching YAML from coordinator via coordinatorClient.GetDAG, treating empty results as nil and parsing via spec.LoadYAML. Wires loader into node.SubWorkflowRunnerFactory via RemoteDAGLoader field.
Test mock implementations
internal/runtime/agent/dbclient_test.go, internal/service/frontend/api/v1/workers_internal_test.go, internal/service/worker/coordreport/status_pusher_test.go, internal/service/worker/poller_test.go, internal/service/worker/remote_handler_test.go
Updated mocks across test files to implement GetDAG method. remote_handler_test mocks gain GetDAGFunc hook for test override. Agent tests updated to pass nil remote loader argument to newDBClient.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • dagucloud/dagu#2188: Coordinates with the DAG store setup surface added to internal/cmd/context.go, which now relies on cmdprocess.NewDAGStore for initialization.

  • dagucloud/dagu#2258: Addresses the same sub-DAG resolution problem by fixing nil-pointer/error handling in local DAG lookup, complementing this PR's remote fallback.

  • dagucloud/dagu#2248: Modifies the same executeDAGRun sub-workflow runner factory wiring that this PR extends with remote loader capability.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 44.44% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately reflects the main change: introducing an RPC fallback mechanism for workers to fetch missing DAG definitions from the coordinator.
Linked Issues check ✅ Passed All objectives from issue #2259 are met: local-first lookup with RPC fallback implemented, graceful error handling, optional feature, and relevant files show complete implementation.
Out of Scope Changes check ✅ Passed All changes are directly aligned with implementing the RPC fallback feature; no out-of-scope modifications detected across the changeset.
Description check ✅ Passed The pull request description is comprehensive and well-structured, covering problem statement, solution, changes, data flow, testing status, and implementation notes.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@four-bytes-robby four-bytes-robby force-pushed the feat/2259-rpc-fallback-dag-definitions branch from 329b0ee to ffb7e07 Compare June 7, 2026 10:04


Adds GetDAG RPC to coordinator service allowing workers to fetch missing
DAG definitions on demand when the local DAG store misses. This eliminates
race conditions in git-synced setups where new DAG YAML may not yet be
present on the worker.

- New gRPC: GetDAG RPC in coordinator proto (GetDAGRequest/GetDAGResponse)
- Coordinator: dagsStore field + GetDAG handler (returns YAML spec by name)
- Client: GetDAG(ctx, name) method on coordinator.Client
- Agent: RemoteDAGLoader func type + option, wired through dbClient
- dbClient: local-first lookup with RPC fallback on miss
- Worker: RemoteDAGLoader closure using coordinator client GetDAG
- Context: DAGStore field added, populated at init, passed to coordinator
@four-bytes-robby four-bytes-robby force-pushed the feat/2259-rpc-fallback-dag-definitions branch from ffb7e07 to 87f00ae Compare June 7, 2026 10:14
@four-bytes-robby four-bytes-robby marked this pull request as ready for review June 10, 2026 15:16
@four-bytes-robby

Copy link
Copy Markdown
Contributor Author

Has been running without regression for a few days now, I think it is safe to merge this.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
internal/runtime/agent/dbclient_test.go (1)

51-51: ⚡ Quick win

Add focused tests for new GetDAG fallback branches.

These updates only adapt constructor calls, but the new GetDAG path now has important branches (local hit, local miss + remote hit, remote nil, remote error, nil local store). A table-driven test for those cases would lock down behavior and catch regressions quickly.

Also applies to: 80-80, 109-109, 136-136, 156-156

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/runtime/agent/dbclient_test.go` at line 51, Add table-driven unit
tests covering the new GetDAG fallback branches for dbClient created via
newDBClient (the cases: local hit, local miss + remote hit, remote returns nil,
remote returns error, and nil local store). For each case, construct
mockDAGStore and mockDAGRunStore behaviors and assertions verifying which store
was called and what GetDAG returns; use a looped test table to assert expected
results and error conditions for dbClient.GetDAG to prevent regressions across
the constructor-call updates referenced around newDBClient and GetDAG.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/runtime/agent/dbclient.go`:
- Around line 32-57: Guard against a nil DAG store and only fall back to the
remote loader for true "not found" local misses: first check if o.ds == nil and
treat that as a local-miss case (log and invoke o.remoteDAGLoader if present),
otherwise call o.ds.GetDetails(ctx, name) and if it returns nil return the dag;
if it returns an error, do NOT blanket-fallback — if the error is a not-found
error (use errors.Is(err, <store-not-found-error>), e.g., dagstore.ErrNotFound
or the package-specific not-found sentinel) then attempt o.remoteDAGLoader (if
o.remoteDAGLoader == nil return the original err), but for any other local error
return it immediately; ensure you still log successful remote loads and remote
loader failures using logger and tag.DAG(name)/tag.Error(remoteErr).

In `@internal/service/coordinator/client.go`:
- Around line 1021-1037: The callback passed to attemptCall returns nil on
transport success even when the coordinator response contains a logical error in
resp.Error, preventing retries; modify the anonymous callback used with
attemptCall (the one calling client.client.GetDAG in client.go) to check resp
after the RPC: if resp is nil or resp.Error != "" return a non-nil error
(include resp.Error in the returned error) instead of nil so attemptCall sees
the failure and continues to other members; keep the existing transport error
handling for callErr unchanged.

---

Nitpick comments:
In `@internal/runtime/agent/dbclient_test.go`:
- Line 51: Add table-driven unit tests covering the new GetDAG fallback branches
for dbClient created via newDBClient (the cases: local hit, local miss + remote
hit, remote returns nil, remote returns error, and nil local store). For each
case, construct mockDAGStore and mockDAGRunStore behaviors and assertions
verifying which store was called and what GetDAG returns; use a looped test
table to assert expected results and error conditions for dbClient.GetDAG to
prevent regressions across the constructor-call updates referenced around
newDBClient and GetDAG.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 26413624-c695-4f37-8995-ec7dbf15f667

📥 Commits

Reviewing files that changed from the base of the PR and between 109b4f2 and 87f00ae.

⛔ Files ignored due to path filters (3)
  • proto/coordinator/v1/coordinator.pb.go is excluded by !**/*.pb.go
  • proto/coordinator/v1/coordinator_grpc.pb.go is excluded by !**/*.pb.go
  • proto/coordinator/v1/coordinator_protoopaque.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (14)
  • internal/cmd/context.go
  • internal/cmd/coord.go
  • internal/cmd/startall.go
  • internal/runtime/agent/agent.go
  • internal/runtime/agent/dbclient.go
  • internal/runtime/agent/dbclient_test.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/handler.go
  • internal/service/frontend/api/v1/workers_internal_test.go
  • internal/service/worker/coordreport/status_pusher_test.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/remote_handler_test.go
  • proto/coordinator/v1/coordinator.proto

Comment thread internal/runtime/agent/dbclient.go
Comment thread internal/service/coordinator/client.go Outdated

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 17 files

Reply with feedback, questions, or to request a fix.

Re-trigger cubic

Comment thread internal/runtime/agent/dbclient.go

@yohamta0 yohamta0 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM 🚀🚀🚀 Thank you very much for implementing this!

@yohamta0 yohamta0 merged commit baf655e into dagucloud:main Jun 11, 2026
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

feat: RPC fallback for missing DAG definitions on workers

2 participants